Redis缓存穿透/击穿/雪崩以及数据一致性的解决方案 您所在的位置:网站首页 redis 启动集群 Redis缓存穿透/击穿/雪崩以及数据一致性的解决方案

Redis缓存穿透/击穿/雪崩以及数据一致性的解决方案

2023-05-31 08:44| 来源: 网络整理| 查看: 265

无论是在开发过程中还是在准备跑路的面试过程中,有关redis相关的,难免会涉及到四个特殊场景:缓存穿透、缓存雪崩、缓存击穿以及数据一致性。如果在开发中不注意这些场景的话,在高并发场景下有可能会导致系统崩溃,数据错乱等情况。现在,结合实际的业务场景来复现并解决这些问题。

相关技术:springboot2.2.2+mybatisplus3.1+redis5.0+hutool5.8

缓存穿透

缓存穿透是指查询缓存和数据库中都不存在的数据,导致所有的查询压力全部给到了数据库。

image.png 比如查询一篇文章信息并对其进行缓存,一般的逻辑是先查询缓存中是否存在该文章,如果存在则直接返回,否则再查询数据库并将查询结果进行缓存。

@Slf4j @Service public class DocumentInfoServiceImpl extends ServiceImpl implements DocumentInfoService { @Resource private StringRedisTemplate stringRedisTemplate; @Override public DocumentInfo getDocumentDetail(int docId) { String redisKey = "doc::info::" + docId; String obj = stringRedisTemplate.opsForValue().get(redisKey); DocumentInfo documentInfo = null; if (StrUtil.isNotEmpty(obj)) { //缓存命中 log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { log.info("==== select from db ===="); documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one(); if (ObjectUtil.isNotNull(documentInfo)) { // 缓存结果 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS); } } return documentInfo; } } @GetMapping("/doc/queryById") public Result queryById(@RequestParam(name = "docId") Integer docId) { return Result.success(documentInfoService.getDocumentDetail(docId)); }

如果项目的并发量不大,这样写的话几乎没啥问题。如果项目的并发量很大,那么这就存在一个隐藏问题,如果在访问了一个不存在的文章(这个文章已经被分享出去,但是在后台可能是被删除或者下线状态),那么就会导致所有的请求全部需要到数据库中进行查询,从而给数据库造成压力,甚至造成宕机。 http://127.0.0.1:8081/doc/queryById?docId=不存在的id

2023-01-05 10:18:57.954 INFO 19692 --- [nio-8081-exec-8] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:58.121 INFO 19692 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:58.350 INFO 19692 --- [io-8081-exec-10] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:58.519 INFO 19692 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:58.661 INFO 19692 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:58.859 INFO 19692 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:59.012 INFO 19692 --- [nio-8081-exec-9] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 10:18:59.154 INFO 19692 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 解决方案一:缓存空对象

针对缓存穿透问题缓存空对象可以有效避免所产生的影响,当查询一条不存在的数据时,在缓存中存储一个空对象并设置一个过期时间(设置过期时间是为了避免出现数据库中存在了数据但是缓存中仍然是空数据现象),这样可以避免所有请求全部查询数据库的情况。

// 查询对象不存在 if(StrUtil.equals(obj,"")){ log.info("==== select from cache , data not available ===="); return null; } if (StrUtil.isNotEmpty(obj)) { log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { log.info("==== select from db ===="); documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one(); //如果数据不存在,则缓存一个空对象并设置过期时间 stringRedisTemplate.opsForValue().set(redisKey, ObjectUtil.isNotNull(documentInfo)?JSONUtil.toJsonStr(documentInfo):"", 5L, TimeUnit.SECONDS); // if (ObjectUtil.isNotNull(documentInfo)) { // stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS); // } } 2023-01-05 13:15:01.057 INFO 16600 --- [nio-8081-exec-3] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from db ==== 2023-01-05 13:15:01.214 INFO 16600 --- [nio-8081-exec-4] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ==== 2023-01-05 13:15:01.384 INFO 16600 --- [nio-8081-exec-5] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ==== 2023-01-05 13:15:01.540 INFO 16600 --- [nio-8081-exec-6] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ==== 2023-01-05 13:15:01.720 INFO 16600 --- [nio-8081-exec-7] c.g.r.s.impl.DocumentInfoServiceImpl : ==== select from cache , data not available ==== 解决方案二:布隆过滤器

image.png

缓存空对象的缺点在于无论数据存不存在都需要查询一次数据库,并且redis中存储了大量的空数据,这个时候可以采用布隆过滤器来解决。布隆过滤器可以简单的理解为由一个很长的二进制数组结合n个hash算法计算出n个数组下标,将这些数据下标置为1。在查找数据时,再次通过n个hash算法计算出数组下标,如果这些下标的值为1,表示该值可能存在(存在hash冲突的原因),如果为0,则表示该值一定不存在。

/** * 布隆过滤器添加元素伪代码 */ BitArr[] bit = new BitArr[10000]; // 新建一个二进制数组 List insertData = Arrays.asList("A", "B", "C"); // 待添加元素 for (String insertDatum : insertData) { for (int i=1;i>> 32); for (int i = 1; i >> 32); for (int i = 1; i { return "doc::info::" + m.getId().intValue(); }).forEach(e->{ bloomFilterUtil.putBloomFilterRedis(e); }); } } }

上面全部搞定后,启动项目并测试结果是否有效,在启动前先在数据表中搞几条测试数据。

image.png

@Override public DocumentInfo getDocumentDetail(int docId) { String redisKey = "doc::info::" + docId; // 布隆过滤器中是否有可能存在这个key boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey); if(!b){ // 如果不存在,直接返回空 log.info("==== select from bloomFilter , data not available ===="); return null; } String obj = stringRedisTemplate.opsForValue().get(redisKey); DocumentInfo documentInfo = null; if (StrUtil.isNotEmpty(obj)) { log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { log.info("==== select from db ===="); documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one(); if(ObjectUtil.isNotNull(documentInfo)){ stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS); } } return documentInfo; }

查询存在的数据。 image.png

查询不存在的数据。

image.png

查看两次请求打印的log,不存在的数据成功被拦截掉了,避免再去查询数据库,即使存在一定的误判率,也几乎不会有啥影响,最多就是查询一次数据库。

image.png

虽然布隆过滤器可以有效的解决缓存穿透问题,并且实现的算法查找效率也很快。但是,也存在一定的缺点,由于存在hash冲突的原因,一方面存在一定的误判率(某个在过滤器中并不存在的key,但是通过hash计算出来的下标值都为1)。另一方面,删除比较困难(如果将一个数组位置为0,那么这个位置有可能也代表其他key的值,会影响到其他的key)。

缓存击穿

缓存击穿是指访问某个热点数据时,缓存中并不存在该数据或者缓存过期了,这个时候全部的请求压力给到了数据库。

image.png

基于上面的代码,来模拟短时间内进行并发请求,看看会不会将请求全部打到数据库。

public static void main(String[] args) throws InterruptedException { /** * 短时间内并发请求接口,并访问同一个数据 */ ExecutorService executorService = Executors.newFixedThreadPool(1000); CountDownLatch countDownLatch = new CountDownLatch(1000); for (int i = 0; i < 1000; i++) { executorService.execute(() -> { HttpResponse response = HttpUtil.createGet("http://127.0.0.1:8081/doc/queryById?docId=1").execute(); System.out.println(response.body()); countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); }

根据日志输出结果显示,请求确实给到了数据库。 image.png 针对缓存击穿问题,有两种解决方案,一种是对热点数据不设置过期时间,另一种是采用互斥锁的方式。

解决方案一:热点数据不设置过期时间

热点数据不设置过期时间,当后台更新热点数据数需要同步更新缓存中的数据,这种解决方式适用于不严格要求缓存一致性的场景。

解决方案二:使用互斥锁

如果是单机部署的环境下可以使用synchronized或lock来处理,保证同时只能有一个线程来查询数据库,其他线程可以等待数据缓存成功后在被唤醒,从而直接查询缓存即可。如果是分布式部署,可以采用分布式锁来实现互斥。

@Component public class RedisLockUtil { @Resource private StringRedisTemplate stringRedisTemplate; /** * 模拟互斥锁 * @param key * @param value * @param exp * @return */ public boolean tryLock(String key, String value, long exp) { Boolean absent = stringRedisTemplate.opsForValue().setIfAbsent(key, value, exp, TimeUnit.SECONDS); if (absent) { return true; } return tryLock(key, value, exp); //如果线程没有获取锁,则在此处循环获取 } /** * 释放锁 * @param key * @param value */ public void unLock(String key, String value) { String s = stringRedisTemplate.opsForValue().get(key); if (StrUtil.equals(s, value)) { //避免锁被其他线程误删 stringRedisTemplate.delete(key); } } }

有了上面的两个方法,可以对业务代码进行改造,在查询数据库前进行加锁,读取完成后在释放锁。

@Override public DocumentInfo getDocumentDetail(int docId) { String redisKey = "doc::info::" + docId; boolean b = bloomFilterUtil.existBloomFilterRedis(redisKey); if (!b) { log.info("==== select from bloomFilter , data not available ===="); return null; } String obj = stringRedisTemplate.opsForValue().get(redisKey); DocumentInfo documentInfo = null; if (StrUtil.isNotEmpty(obj)) { log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { String s = UUID.randomUUID().toString(); //给锁加个标识,避免误删 String lockKey = redisKey+"::lock"; boolean lock = redisLockUtil.tryLock(lockKey, s, 60); //尝试加锁 if (lock) { try { //如果加锁成功,先再次查询缓存,有可能上一个线程查询并添加到缓存了 obj = stringRedisTemplate.opsForValue().get(redisKey); if (StrUtil.isNotEmpty(obj)) { log.info("==== select from cache ===="); documentInfo = JSONUtil.toBean(obj, DocumentInfo.class); } else { log.info("==== select from db ===="); documentInfo = this.lambdaQuery().eq(DocumentInfo::getId, docId).one(); if (ObjectUtil.isNotNull(documentInfo)) { stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L, TimeUnit.SECONDS); } } } finally { redisLockUtil.unLock(lockKey, s); //释放锁 } } } return documentInfo; }

一顿梭哈后,再次模拟并发查询,看看最终效果,理想的结果状态应该是查询一次数据库,后面的查询直接通过缓存获取。通过日志输出可以看出来,击穿问题被有效解决啦。 image.png

缓存雪崩

缓存雪崩是指对热点数据设置了相同的过期时间,在同一时间这些热点数据key大批量发生过期,请求全部转发到数据库,从而导致数据库压力骤增,甚至宕机。与缓存击穿不同的是,缓存击穿是单个热点数据过期,而缓存雪崩是大批量热点数据过期。

image.png

针对缓存雪崩问题,常见的解决方案有多种,比如设置随机的过期时间或者不设置过期时间,搭建高可用的缓存架构避免redis服务宕机,服务降级等。

解决方案一:设置随机的过期时间

将key的过期时间后面加上一个随机数,这个随机数值的范围可以根据自己的业务情况自行设定,这样可以让key均匀的失效,避免大批量的同时失效。

if (ObjectUtil.isNotNull(documentInfo)) { //生成一个随机数 int randomInt = RandomUtil.randomInt(2, 10); //过期时间+随机数 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo), 5L+randomInt, TimeUnit.SECONDS); } 解决方案二:不设置过期时间

不设置过期时间时,需要注意的是,在更新数据库数据时,同时也需要更新缓存数据,否则数据会出现不一致的情况。这种方式比较适用于不严格要求缓存一致性的场景。

if (ObjectUtil.isNotNull(documentInfo)) { stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo)); } 解决方案三:搭建高可用集群

缓存服务故障时,也会触发缓存雪崩,为了避免因服务故障而发生的雪崩,推荐使用高可用的服务集群,这样即使发生故障,也可以进行故障转移。集群相关的在之前也有介绍过Redis高可用架构搭建到原理分析,这里就不再介绍了,如果想要理解,可以看这篇文章。

数据一致性

通常情况下,使用缓存的直接目的是为了提高系统的查询效率,减轻数据库的压力。一般情况下使用缓存是下面这几步骤:

查询缓存,数据是否存在 如果数据存在,直接返回 如果数据不存在,再查询数据库 如果数据库中数据存在,那么将该数据存入缓存并返回。如果不存在,返回空。

image.png

这么搞好像看上去并没有啥问题,那么会有一个细节问题:当一条数据存入缓存后,立刻又被修改了,那么这个时候缓存该如何更新呢。不更新肯定不行,这样导致了缓存中的数据与数据库中的数据不一致。一般情况下对于缓存更新有下面这几种情况:

先更新缓存,再更新数据库 先更新数据库,再更新缓存 先删除缓存,再更新数据库 先更新数据库,再删除缓存 先更新缓存,再更新数据库

先更新缓存,再更新数据库这种情况下,如果业务执行正常,不出现网络等问题,这么操作不会有啥问题,两边都可以更新成功。但是,如果缓存更新成功了,但是当更新数据库时或者在更新数据库之前出现了异常,导致数据库无法更新。这种情况下,缓存中的数据变成了一条实际不存在的假数据。 image.png 比如,在更新文章详情时,先修改了redis中的数据,在更新数据库前抛出一个异常来模拟数据库更新失败的场景。

public boolean updateDocument(DocumentInfo documentInfo) { String redisKey = "doc::info::" + documentInfo.getId(); // 先更新缓存 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo)); // 模拟更新数据库前出现异常 int i = 1 / 0; // 在更新数据库 boolean b = this.updateById(documentInfo); return b; } @PostMapping("/doc/updateDocument") public Result updateDocument(@RequestBody DocumentInfo documentInfo) { return Result.success(documentInfoService.updateDocument(documentInfo)); }

先调用更新的接口,在调用查询的接口,结果发现查询的接口返回的并不是数据库中的实际数据,这个时候就造成了缓存与数据库数据不一致的情况。

image.png

image.png

image.png

先更新数据库,再更新缓存

先更新数据库,再更新缓存和先更新缓存,再更新数据库的情况基本一致,如果失败,会导致数据库中是最新的数据,缓存中是旧数据。还有一种极端情况,在高并发情况下容易出现数据覆盖的现象:A线程更新完数据库后,在要执行更新缓存的操作时,线程被阻塞了,这个时候线程B更新了数据库并成功更新了缓存,当B执行完成后线程A继续向下执行,那么最终线程B的数据会被覆盖。

image.png

@Override public boolean updateDocument(DocumentInfo documentInfo) { String redisKey = "doc::info::" + documentInfo.getId(); // 更新数据库 boolean b = this.updateById(documentInfo); //以标题为标识,模拟线程阻塞。当一个请求的标题为‘模拟数据覆盖’时,线程停4秒 if(StrUtil.equals(documentInfo.getTitle(),"模拟数据覆盖")){ try { Thread.sleep(4000); }catch (Exception e){ } } // 更新缓存 stringRedisTemplate.opsForValue().set(redisKey, JSONUtil.toJsonStr(documentInfo)); // 模拟更新数据库前出现异常 return b; } 先删除缓存,再更新数据库

先删除缓存,再更新数据库这种情况,如果并发量不大用起来不会有啥问题。但是在并发场景下会有这样的问题:线程A在删除缓存后,在写入数据库前发生了阻塞。这时线程B查询了这条数据,发现缓存中不存在,继而向数据库发起查询请求,并将查询结果缓存到了redis。当线程B执行完成后,线程A继续向下执行更新了数据库,那么这时缓存中的数据为旧数据,与数据库中的值不一致。

image.png

先更新数据库,再删除缓存

先更新数据库,再删除缓存也并不是绝对安全的,在高并发场景下,如果线程A查询一条在缓存中不存在的数据(这条数据有可能过期被删除了),查询数据库后在要将查询结果缓存到redis时发生了阻塞。这个时候线程B发起了更新请求,先更新了数据库,再次删除了缓存。当线程B执行成功后,线程A继续向下执行,将查询结果缓存到了redis中,那么此时缓存中的数据与数据库中的数据发生了不一致。

image.png

解决数据不一致方案 延时双删

延时双删,即在写数据库之前删除一次,写完数据库后,再删除一次,在第二次删除时,并不是立即删除,而是等待一定时间在做删除。

image.png

这个延时的功能可以使用mq来实现,这里为了省事,偷个懒,本地测试使用的延时队列来模拟mq达到延时效果。首先需要定义一个队列元素对象DoubleDeleteTask。

@Data public class DoubleDeleteTask implements Delayed { private String key; // 需要删除的key private long time; //需要延迟的时间 public DoubleDeleteTask(String key, long time) { this.key = key; this.time = time + System.currentTimeMillis(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(time, ((DoubleDeleteTask) o).time); } }

然后定义一个队列并交给spring管理。

@Configuration public class DoubleDeleteQueueConfig { @Bean(name = "doubleDeleteQueue") public DelayQueue doubleDeleteQueue() { return new DelayQueue(); } }

设置一个独立线程,特意用来处理延时的任务。如果数据删除失败,可以自定义重试次数以保证数据的一致性,但是也会带来一定的性能影响,如果在实际项目中,建议还是以异步的方式来实现重试。

@Slf4j @Component public class DoubleDeleteTaskRunner implements CommandLineRunner { @Resource private DelayQueue doubleDeleteQueue; @Resource private StringRedisTemplate stringRedisTemplate; private static final int retryCount = 3; //失败重试次数 @Override public void run(String... args) throws Exception { new Thread(() -> { try { while (true) { DoubleDeleteTask take = doubleDeleteQueue.take(); //取出队列元素 String key = take.getKey(); try { stringRedisTemplate.delete(key); log.info("====延时删除key:{}====",key); } catch (Exception e) { //失败重试 int count = 1; for (int i = 1; i


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有